blob: c355a6152abc1bb01021557622ef052b0eea476f [file] [log] [blame] [edit]
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apidApigeeSync
import (
"bytes"
"encoding/json"
"github.com/apid/apid-core/util"
"io/ioutil"
"net/http"
"net/url"
"path"
"sync/atomic"
"time"
)
var (
refreshFloatTime = time.Minute
)
/*
Usage:
man := createTokenManager()
bearer := man.getBearerToken()
will automatically update config(configBearerToken) for other modules
*/
func createApidTokenManager(isNewInstance bool) *apidTokenManager {
isClosedInt := int32(0)
t := &apidTokenManager{
quitPollingForToken: make(chan bool, 1),
closed: make(chan bool),
getTokenChan: make(chan bool),
invalidateTokenChan: make(chan bool),
returnTokenChan: make(chan *OauthToken),
invalidateDone: make(chan bool),
tokenUpdatedChan: make(chan bool, 1),
isClosed: &isClosedInt,
isNewInstance: isNewInstance,
}
return t
}
type apidTokenManager struct {
token *OauthToken
isClosed *int32
quitPollingForToken chan bool
closed chan bool
getTokenChan chan bool
invalidateTokenChan chan bool
refreshTimer <-chan time.Time
returnTokenChan chan *OauthToken
invalidateDone chan bool
tokenUpdatedChan chan bool
isNewInstance bool
}
func (t *apidTokenManager) start() {
t.retrieveNewToken()
t.refreshTimer = time.After(t.token.refreshIn())
go t.maintainToken()
}
func (t *apidTokenManager) getBearerToken() string {
return t.getToken().AccessToken
}
func (t *apidTokenManager) maintainToken() {
for {
select {
case <-t.closed:
return
case <-t.refreshTimer:
log.Debug("auto refresh token")
t.retrieveNewToken()
t.refreshTimer = time.After(t.token.refreshIn())
case <-t.getTokenChan:
token := t.token
t.returnTokenChan <- token
case <-t.invalidateTokenChan:
t.retrieveNewToken()
t.refreshTimer = time.After(t.token.refreshIn())
t.invalidateDone <- true
}
}
}
// will block until valid
func (t *apidTokenManager) invalidateToken() {
log.Debug("invalidating token")
t.invalidateTokenChan <- true
<-t.invalidateDone
}
func (t *apidTokenManager) getToken() *OauthToken {
//has been closed
if atomic.LoadInt32(t.isClosed) == int32(1) {
log.Debug("TokenManager: getToken() called on closed tokenManager")
return nil
}
t.getTokenChan <- true
return <-t.returnTokenChan
}
/*
* blocking close() of tokenMan
*/
func (t *apidTokenManager) close() {
//has been closed
if atomic.SwapInt32(t.isClosed, 1) == int32(1) {
log.Panic("TokenManager: close() has been called before!")
return
}
log.Debug("close token manager")
t.quitPollingForToken <- true
// sending instead of closing, to make sure it enters the t.doRefresh branch
t.closed <- true
close(t.closed)
log.Debug("token manager closed")
}
// don't call externally. will block until success.
func (t *apidTokenManager) retrieveNewToken() {
log.Debug("Getting OAuth token...")
uriString := config.GetString(configProxyServerBaseURI)
uri, err := url.Parse(uriString)
if err != nil {
log.Panicf("unable to parse uri config '%s' value: '%s': %v", configProxyServerBaseURI, uriString, err)
}
uri.Path = path.Join(uri.Path, "/accesstoken")
pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : %v", err) })
}
func (t *apidTokenManager) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
return func(_ chan bool) error {
form := url.Values{}
form.Set("grant_type", "client_credentials")
form.Add("client_id", config.GetString(configConsumerKey))
form.Add("client_secret", config.GetString(configConsumerSecret))
req, err := http.NewRequest("POST", uri.String(), bytes.NewBufferString(form.Encode()))
req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value")
req.Header.Set("display_name", apidInfo.InstanceName)
req.Header.Set("apid_instance_id", apidInfo.InstanceID)
req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
req.Header.Set("status", "ONLINE")
req.Header.Set("plugin_details", apidPluginDetails)
if t.isNewInstance {
req.Header.Set("created_at_apid", time.Now().Format(time.RFC3339))
t.isNewInstance = false
} else {
req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
}
client := &http.Client{
Transport: util.Transport(config.GetString(util.ConfigfwdProxyPortURL)),
Timeout: httpTimeout,
}
resp, err := client.Do(req)
if err != nil {
log.Errorf("Unable to Connect to Edge Proxy Server: %v", err)
return err
}
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Errorf("Unable to read EdgeProxy Sever response: %v", err)
return err
}
if resp.StatusCode != 200 {
log.Errorf("Oauth Request Failed with Resp Code: %d. Body: %s", resp.StatusCode, string(body))
return expected200Error
}
var token OauthToken
err = json.Unmarshal(body, &token)
if err != nil {
log.Errorf("unable to unmarshal JSON response '%s': %v", string(body), err)
return err
}
if token.ExpiresIn > 0 {
token.ExpiresAt = time.Now().Add(time.Duration(token.ExpiresIn) * time.Second)
} else {
// no expiration, arbitrarily expire about a year from now
token.ExpiresAt = time.Now().Add(365 * 24 * time.Hour)
}
log.Debugf("Got new token: %#v", token)
t.token = &token
config.Set(configBearerToken, token.AccessToken)
//don't block on the buffered channel. that means there is already a signal to serve new token
//TODO: This assumes apid-gateway is 1-1 mapping. Make use of generic long-polling provided by apid-core
select {
case t.tokenUpdatedChan <- true:
default:
log.Debug("Token refresh notification already sent")
}
return nil
}
}
func (t *apidTokenManager) getTokenReadyChannel() <-chan bool {
return t.tokenUpdatedChan
}
type OauthToken struct {
IssuedAt int64 `json:"issuedAt"`
AppName string `json:"applicationName"`
Scope string `json:"scope"`
Status string `json:"status"`
ApiProdList []string `json:"apiProductList"`
// in seconds
ExpiresIn int64 `json:"expiresIn"`
DeveloperEmail string `json:"developerEmail"`
TokenType string `json:"tokenType"`
ClientId string `json:"clientId"`
AccessToken string `json:"accessToken"`
RefreshExpIn int64 `json:"refreshTokenExpiresIn"`
RefreshCount int64 `json:"refreshCount"`
ExpiresAt time.Time
}
var noTime time.Time
func (t *OauthToken) isValid() bool {
if t == nil || t.AccessToken == "" {
return false
}
return t.AccessToken != "" && time.Now().Before(t.ExpiresAt)
}
func (t *OauthToken) refreshIn() time.Duration {
if t == nil || t.ExpiresAt == noTime {
return time.Duration(0)
}
return t.ExpiresAt.Sub(time.Now()) - refreshFloatTime
}
func (t *OauthToken) needsRefresh() bool {
if t == nil || t.ExpiresAt == noTime {
return true
}
return time.Now().Add(refreshFloatTime).After(t.ExpiresAt)
}