diff options
author | Daniel Edgecumbe <git@esotericnonsense.com> | 2019-10-27 21:14:45 +0100 |
---|---|---|
committer | Daniel Edgecumbe <git@esotericnonsense.com> | 2019-10-27 21:14:45 +0100 |
commit | 5491bd44a61a5d24bec7c9c5ddd531f1809364a1 (patch) | |
tree | 561176ad7610be64c9db17315581370585d17327 | |
parent | 31e4578f52f9cc6d12d4c371973d2868ace52116 (diff) |
Working keep-alive implementation
-rw-r--r-- | src/client.rs | 168 | ||||
-rw-r--r-- | src/result.rs | 1 |
2 files changed, 119 insertions, 50 deletions
diff --git a/src/client.rs b/src/client.rs index 398b01d..b006dd1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -78,7 +78,7 @@ impl BFCredentials { pub struct BFClient { client: reqwest::Client, destructor: mpsc::SyncSender<()>, - session_token: RwLock<Option<String>>, + session_token: Arc<RwLock<Option<String>>>, creds: BFCredentials, proxy_uri: Option<String>, } @@ -105,22 +105,18 @@ impl BFClient { None => reqwest::Client::new(), }; - let session_token = RwLock::new(None); - let (destructor, rx) = mpsc::sync_channel(0); // rendezvous channel - thread::spawn(move || { - info!("New keepalive thread spawned for BFClient"); - loop { - match rx.recv_timeout(Duration::from_millis(1000)) { - Ok(_) => { - warn!("got destructor signal, thread finishing"); - break; - } - Err(_) => { - info!("thread still running"); - } - } - } - }); + let session_token = Arc::new(RwLock::new(None)); + + let destructor = { + let session_token = session_token.clone(); + let proxy_uri = proxy_uri.clone(); + let (tx, rx) = mpsc::sync_channel(0); // rendezvous channel + thread::spawn(|| { + Self::keepalive_thread(session_token, proxy_uri, rx) + }); + tx + }; + Ok(Arc::new(BFClient { client, destructor, @@ -130,39 +126,46 @@ impl BFClient { })) } - // TODO keepalive - // https://identitysso.betfair.com/api/keepAlive - // Accept (mandatory) - // Header that signals that the response should be returned as JSON application/json - // X-Authentication (mandatory) - // Header that represents the session token that needs to be keep alive Session Token - // X-Application (optional) - // Header the Application Key used by the customer to identify the product. App Key - // Response structure - // - // - // { - // "token":"<token_passed_as_header>", - // "product":"product_passed_as_header", - // "status":"<status>", - // "error":"<error>" - // } - // Status values - // - // - // SUCCESS - // FAIL - // Error values - // - // - // INPUT_VALIDATION_ERROR - // INTERNAL_ERROR - // NO_SESSION - - // general notes - // We would therefore recommend that all Betfair API request are sent with the ‘Accept-Encoding: gzip, deflate’ request header. - // We recommend that Connection: keep-alive header is set for all requests to guarantee a persistent connection and therefore reducing latency. Please note: Idle keep-alive connection to the API endpoints are closed every 3 minutes. - // You should ensure that you handle the INVALID_SESSION_TOKEN error within your code by creating a new session token via the API login method. + fn keepalive_thread( + session_token: Arc<RwLock<Option<String>>>, + proxy_uri: Option<String>, + rx: mpsc::Receiver<()>, + ) { + info!("New keepalive thread spawned for BFClient"); + loop { + match rx.recv_timeout(Duration::from_millis(5000)) { + Ok(_) => { + warn!("got destructor signal, thread finishing"); + break; + } + Err(_) => { + let maybe_token: Option<String> = session_token + .read() + .expect( + "keepalive thread could not lock session token", + ) + .clone(); + match maybe_token { + None => { + debug!("no keepalive required as no token"); + } + Some(token) => { + warn!("attempting keepalive"); + match keepalive(&token, &proxy_uri) { + Ok(()) => { + info!("successful keepalive"); + } + Err(e) => { + warn!("keepalive failed: {:?}", e); + // TODO login + } + }; + } + } + } + }; + } + } fn req_internal<T1: Serialize, T2: DeserializeOwned>( &self, @@ -268,3 +271,68 @@ impl BFClient { } } } + +fn keepalive(token: &String, proxy_uri: &Option<String>) -> Result<()> { + const KEEPALIVE_URI: &str = + "https://identitysso.betfair.com/api/keepAlive"; + + let client: Client = match proxy_uri { + Some(uri) => { + let proxy = reqwest::Proxy::all(uri)?; + Client::builder().proxy(proxy).build()? + } + None => Client::new(), + }; + + info!("KeepAliveRequest ..."); + let keep_alive_response: KeepAliveResponse = client + .get(KEEPALIVE_URI) + .header("Accept", "application/json") + .header( + "X-Application", + format!("schroedinger_{}", rand::random::<u128>()), + ) + .header("X-Authentication", token) + .send()? + .json()?; + + match keep_alive_response.status { + KeepAliveStatus::SUCCESS => { + info!("KeepAliveResponse was successful"); + Ok(()) + } + KeepAliveStatus::FAIL => { + warn!("KeepAliveResponse: {:?}", keep_alive_response.error); + Err(Error::BFKeepAliveFailure( + keep_alive_response.error.unwrap(), + )) + } + } +} + +#[derive(Debug, Deserialize)] +#[allow(non_camel_case_types)] +pub enum KeepAliveError { + // TODO should this really be public? + #[serde(rename = "")] + NONE, + INPUT_VALIDATION_ERROR, + INTERNAL_ERROR, + NO_SESSION, +} + +#[derive(Debug, Deserialize)] +#[allow(non_camel_case_types)] +enum KeepAliveStatus { + SUCCESS, + FAIL, +} + +#[derive(Debug, Deserialize)] +#[allow(non_snake_case)] +struct KeepAliveResponse { + token: String, + product: String, + status: KeepAliveStatus, + error: Option<KeepAliveError>, +} diff --git a/src/result.rs b/src/result.rs index 55f0f1a..285cea5 100644 --- a/src/result.rs +++ b/src/result.rs @@ -19,6 +19,7 @@ pub enum Error { Io(std::io::Error), Reqwest(reqwest::Error), BFLoginFailure(String), + BFKeepAliveFailure(crate::client::KeepAliveError), // could be an enum General(String), Other, } |