aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Edgecumbe <git@esotericnonsense.com>2019-10-27 21:14:45 +0100
committerDaniel Edgecumbe <git@esotericnonsense.com>2019-10-27 21:14:45 +0100
commit5491bd44a61a5d24bec7c9c5ddd531f1809364a1 (patch)
tree561176ad7610be64c9db17315581370585d17327
parent31e4578f52f9cc6d12d4c371973d2868ace52116 (diff)
Working keep-alive implementation
-rw-r--r--src/client.rs168
-rw-r--r--src/result.rs1
2 files changed, 119 insertions, 50 deletions
diff --git a/src/client.rs b/src/client.rs
index 398b01d..b006dd1 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -78,7 +78,7 @@ impl BFCredentials {
pub struct BFClient {
client: reqwest::Client,
destructor: mpsc::SyncSender<()>,
- session_token: RwLock<Option<String>>,
+ session_token: Arc<RwLock<Option<String>>>,
creds: BFCredentials,
proxy_uri: Option<String>,
}
@@ -105,22 +105,18 @@ impl BFClient {
None => reqwest::Client::new(),
};
- let session_token = RwLock::new(None);
- let (destructor, rx) = mpsc::sync_channel(0); // rendezvous channel
- thread::spawn(move || {
- info!("New keepalive thread spawned for BFClient");
- loop {
- match rx.recv_timeout(Duration::from_millis(1000)) {
- Ok(_) => {
- warn!("got destructor signal, thread finishing");
- break;
- }
- Err(_) => {
- info!("thread still running");
- }
- }
- }
- });
+ let session_token = Arc::new(RwLock::new(None));
+
+ let destructor = {
+ let session_token = session_token.clone();
+ let proxy_uri = proxy_uri.clone();
+ let (tx, rx) = mpsc::sync_channel(0); // rendezvous channel
+ thread::spawn(|| {
+ Self::keepalive_thread(session_token, proxy_uri, rx)
+ });
+ tx
+ };
+
Ok(Arc::new(BFClient {
client,
destructor,
@@ -130,39 +126,46 @@ impl BFClient {
}))
}
- // TODO keepalive
- // https://identitysso.betfair.com/api/keepAlive
- // Accept (mandatory)
- // Header that signals that the response should be returned as JSON application/json
- // X-Authentication (mandatory)
- // Header that represents the session token that needs to be keep alive Session Token
- // X-Application (optional)
- // Header the Application Key used by the customer to identify the product. App Key
- // Response structure
- //
- //
- // {
- // "token":"<token_passed_as_header>",
- // "product":"product_passed_as_header",
- // "status":"<status>",
- // "error":"<error>"
- // }
- // Status values
- //
- //
- // SUCCESS
- // FAIL
- // Error values
- //
- //
- // INPUT_VALIDATION_ERROR
- // INTERNAL_ERROR
- // NO_SESSION
-
- // general notes
- // We would therefore recommend that all Betfair API request are sent with the ‘Accept-Encoding: gzip, deflate’ request header.
- // We recommend that Connection: keep-alive header is set for all requests to guarantee a persistent connection and therefore reducing latency. Please note: Idle keep-alive connection to the API endpoints are closed every 3 minutes.
- // You should ensure that you handle the INVALID_SESSION_TOKEN error within your code by creating a new session token via the API login method.
+ fn keepalive_thread(
+ session_token: Arc<RwLock<Option<String>>>,
+ proxy_uri: Option<String>,
+ rx: mpsc::Receiver<()>,
+ ) {
+ info!("New keepalive thread spawned for BFClient");
+ loop {
+ match rx.recv_timeout(Duration::from_millis(5000)) {
+ Ok(_) => {
+ warn!("got destructor signal, thread finishing");
+ break;
+ }
+ Err(_) => {
+ let maybe_token: Option<String> = session_token
+ .read()
+ .expect(
+ "keepalive thread could not lock session token",
+ )
+ .clone();
+ match maybe_token {
+ None => {
+ debug!("no keepalive required as no token");
+ }
+ Some(token) => {
+ warn!("attempting keepalive");
+ match keepalive(&token, &proxy_uri) {
+ Ok(()) => {
+ info!("successful keepalive");
+ }
+ Err(e) => {
+ warn!("keepalive failed: {:?}", e);
+ // TODO login
+ }
+ };
+ }
+ }
+ }
+ };
+ }
+ }
fn req_internal<T1: Serialize, T2: DeserializeOwned>(
&self,
@@ -268,3 +271,68 @@ impl BFClient {
}
}
}
+
+fn keepalive(token: &String, proxy_uri: &Option<String>) -> Result<()> {
+ const KEEPALIVE_URI: &str =
+ "https://identitysso.betfair.com/api/keepAlive";
+
+ let client: Client = match proxy_uri {
+ Some(uri) => {
+ let proxy = reqwest::Proxy::all(uri)?;
+ Client::builder().proxy(proxy).build()?
+ }
+ None => Client::new(),
+ };
+
+ info!("KeepAliveRequest ...");
+ let keep_alive_response: KeepAliveResponse = client
+ .get(KEEPALIVE_URI)
+ .header("Accept", "application/json")
+ .header(
+ "X-Application",
+ format!("schroedinger_{}", rand::random::<u128>()),
+ )
+ .header("X-Authentication", token)
+ .send()?
+ .json()?;
+
+ match keep_alive_response.status {
+ KeepAliveStatus::SUCCESS => {
+ info!("KeepAliveResponse was successful");
+ Ok(())
+ }
+ KeepAliveStatus::FAIL => {
+ warn!("KeepAliveResponse: {:?}", keep_alive_response.error);
+ Err(Error::BFKeepAliveFailure(
+ keep_alive_response.error.unwrap(),
+ ))
+ }
+ }
+}
+
+#[derive(Debug, Deserialize)]
+#[allow(non_camel_case_types)]
+pub enum KeepAliveError {
+ // TODO should this really be public?
+ #[serde(rename = "")]
+ NONE,
+ INPUT_VALIDATION_ERROR,
+ INTERNAL_ERROR,
+ NO_SESSION,
+}
+
+#[derive(Debug, Deserialize)]
+#[allow(non_camel_case_types)]
+enum KeepAliveStatus {
+ SUCCESS,
+ FAIL,
+}
+
+#[derive(Debug, Deserialize)]
+#[allow(non_snake_case)]
+struct KeepAliveResponse {
+ token: String,
+ product: String,
+ status: KeepAliveStatus,
+ error: Option<KeepAliveError>,
+}
diff --git a/src/result.rs b/src/result.rs
index 55f0f1a..285cea5 100644
--- a/src/result.rs
+++ b/src/result.rs
@@ -19,6 +19,7 @@ pub enum Error {
Io(std::io::Error),
Reqwest(reqwest::Error),
BFLoginFailure(String),
+ BFKeepAliveFailure(crate::client::KeepAliveError), // could be an enum
General(String),
Other,
}