| package apidApigeeSync |
| |
| import ( |
| "encoding/json" |
| "fmt" |
| "net/http" |
| "os" |
| "time" |
| |
| "github.com/30x/apid-core" |
| ) |
| |
| const ( |
| configPollInterval = "apigeesync_poll_interval" |
| configProxyServerBaseURI = "apigeesync_proxy_server_base" |
| configSnapServerBaseURI = "apigeesync_snapshot_server_base" |
| configChangeServerBaseURI = "apigeesync_change_server_base" |
| configConsumerKey = "apigeesync_consumer_key" |
| configConsumerSecret = "apigeesync_consumer_secret" |
| configApidClusterId = "apigeesync_cluster_id" |
| configSnapshotProtocol = "apigeesync_snapshot_proto" |
| configName = "apigeesync_instance_name" |
| ApigeeSyncEventSelector = "ApigeeSync" |
| |
| // special value - set by ApigeeSync, not taken from configuration |
| configApidInstanceID = "apigeesync_apid_instance_id" |
| // This will not be needed once we have plugin handling tokens. |
| configBearerToken = "apigeesync_bearer_token" |
| ) |
| |
| var ( |
| /* All set during plugin initialization */ |
| log apid.LogService |
| config apid.ConfigService |
| dataService apid.DataService |
| events apid.EventsService |
| apidInfo apidInstanceInfo |
| newInstanceID bool |
| tokenManager *tokenMan |
| changeManager *pollChangeManager |
| snapManager *snapShotManager |
| httpclient *http.Client |
| |
| /* Set during post plugin initialization |
| * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called |
| */ |
| apidPluginDetails string = `[{"name":"apidApigeeSync","schemaVer":"1.0"}]` |
| ) |
| |
| type apidInstanceInfo struct { |
| InstanceID, InstanceName, ClusterID, LastSnapshot string |
| } |
| |
| type pluginDetail struct { |
| Name string `json:"name"` |
| SchemaVersion string `json:"schemaVer"` |
| } |
| |
| func init() { |
| apid.RegisterPlugin(initPlugin) |
| } |
| |
| func initConfigDefaults() { |
| config.SetDefault(configPollInterval, 120*time.Second) |
| config.SetDefault(configSnapshotProtocol, "json") |
| name, errh := os.Hostname() |
| if (errh != nil) && (len(config.GetString(configName)) == 0) { |
| log.Errorf("Not able to get hostname for kernel. Please set '%s' property in config", configName) |
| name = "Undefined" |
| } |
| config.SetDefault(configName, name) |
| log.Debugf("Using %s as display name", config.GetString(configName)) |
| } |
| |
| func initVariables(services apid.Services) error { |
| dataService = services.Data() |
| events = services.Events() |
| |
| tr := &http.Transport{ |
| MaxIdleConnsPerHost: maxIdleConnsPerHost, |
| } |
| httpclient = &http.Client{ |
| Transport: tr, |
| Timeout: httpTimeout, |
| CheckRedirect: func(req *http.Request, _ []*http.Request) error { |
| req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken()) |
| return nil |
| }, |
| } |
| |
| //TODO listen for arbitrary commands, these channels can be used to kill polling goroutines |
| //also useful for testing |
| snapManager = createSnapShotManager() |
| changeManager = createChangeManager() |
| |
| // set up default database |
| db, err := dataService.DB() |
| if err != nil { |
| return fmt.Errorf("Unable to access DB: %v", err) |
| } |
| err = initDB(db) |
| if err != nil { |
| return fmt.Errorf("Unable to access DB: %v", err) |
| } |
| setDB(db) |
| |
| apidInfo, err = getApidInstanceInfo() |
| if err != nil { |
| return fmt.Errorf("Unable to get apid instance info: %v", err) |
| } |
| |
| if config.IsSet(configApidInstanceID) { |
| log.Warnf("ApigeeSync plugin overriding %s.", configApidInstanceID) |
| } |
| config.Set(configApidInstanceID, apidInfo.InstanceID) |
| |
| return nil |
| } |
| |
| func checkForRequiredValues() error { |
| // check for required values |
| for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret, |
| configSnapServerBaseURI, configChangeServerBaseURI} { |
| if !config.IsSet(key) { |
| return fmt.Errorf("Missing required config value: %s", key) |
| } |
| } |
| proto := config.GetString(configSnapshotProtocol) |
| if proto != "json" && proto != "proto" { |
| return fmt.Errorf("Illegal value for %s. Must be: 'json' or 'proto'", configSnapshotProtocol) |
| } |
| |
| return nil |
| } |
| |
| func SetLogger(logger apid.LogService) { |
| log = logger |
| } |
| |
| /* Idempotent state initialization */ |
| func _initPlugin(services apid.Services) error { |
| SetLogger(services.Log().ForModule("apigeeSync")) |
| log.Debug("start init") |
| |
| config = services.Config() |
| initConfigDefaults() |
| |
| err := checkForRequiredValues() |
| if err != nil { |
| return err |
| } |
| |
| err = initVariables(services) |
| if err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| func initPlugin(services apid.Services) (apid.PluginData, error) { |
| |
| err := _initPlugin(services) |
| if err != nil { |
| return pluginData, err |
| } |
| |
| /* This callback function will get called once all the plugins are |
| * initialized (not just this plugin). This is needed because, |
| * downloadSnapshots/changes etc have to begin to be processed only |
| * after all the plugins are initialized |
| */ |
| events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins) |
| |
| log.Debug("end init") |
| |
| return pluginData, nil |
| } |
| |
| // Plugins have all initialized, gather their info and start the ApigeeSync downloads |
| func postInitPlugins(event apid.Event) { |
| var plinfoDetails []pluginDetail |
| if pie, ok := event.(apid.PluginsInitializedEvent); ok { |
| /* |
| * Store the plugin details in the heap. Needed during |
| * Bearer token generation request. |
| */ |
| for _, plugin := range pie.Plugins { |
| name := plugin.Name |
| version := plugin.Version |
| if schemaVersion, ok := plugin.ExtraData["schemaVersion"].(string); ok { |
| inf := pluginDetail{ |
| Name: name, |
| SchemaVersion: schemaVersion} |
| plinfoDetails = append(plinfoDetails, inf) |
| log.Debugf("plugin %s is version %s, schemaVersion: %s", name, version, schemaVersion) |
| } |
| } |
| if plinfoDetails == nil { |
| log.Panicf("No Plugins registered!") |
| } |
| |
| pgInfo, err := json.Marshal(plinfoDetails) |
| if err != nil { |
| log.Panicf("Unable to marshal plugin data: %v", err) |
| } |
| apidPluginDetails = string(pgInfo[:]) |
| |
| log.Debug("start post plugin init") |
| |
| tokenManager = createTokenManager() |
| |
| go bootstrap() |
| |
| events.Listen(ApigeeSyncEventSelector, &handler{}) |
| log.Debug("Done post plugin init") |
| } |
| } |