|  | /* | 
|  | Copyright 2016 The Transicator Authors | 
|  |  | 
|  | Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | you may not use this file except in compliance with the License. | 
|  | You may obtain a copy of the License at | 
|  |  | 
|  | http://www.apache.org/licenses/LICENSE-2.0 | 
|  |  | 
|  | Unless required by applicable law or agreed to in writing, software | 
|  | distributed under the License is distributed on an "AS IS" BASIS, | 
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | See the License for the specific language governing permissions and | 
|  | limitations under the License. | 
|  | */ | 
|  | package main | 
|  |  | 
|  | import ( | 
|  | "errors" | 
|  | "fmt" | 
|  | "net/http" | 
|  | "regexp" | 
|  | "strconv" | 
|  | "time" | 
|  |  | 
|  | "github.com/30x/goscaffold" | 
|  | log "github.com/Sirupsen/logrus" | 
|  | "github.com/apigee-labs/transicator/common" | 
|  | "github.com/apigee-labs/transicator/replication" | 
|  | "github.com/julienschmidt/httprouter" | 
|  | ) | 
|  |  | 
|  | const ( | 
|  | defaultLimit             = 100 | 
|  | maxLimitChanges          = 100000 | 
|  | changeSelectorValidChars = "^[0-9a-z_-]+$" | 
|  | ) | 
|  |  | 
|  | var emptySequence = common.Sequence{} | 
|  | var lowestPossibleSequence = common.MakeSequence(0, 1) | 
|  | var reChangeSelector = regexp.MustCompile(changeSelectorValidChars) | 
|  |  | 
|  | func (s *server) initChangesAPI(prefix string, router *httprouter.Router) { | 
|  | router.HandlerFunc("GET", prefix+"/changes", s.handleGetChanges) | 
|  | } | 
|  |  | 
|  | func (s *server) handleGetChanges(resp http.ResponseWriter, req *http.Request) { | 
|  | enc := goscaffold.SelectMediaType(req, []string{jsonContent, protoContent}) | 
|  | if enc == "" { | 
|  | sendAPIError(unsupportedFormat, "", resp, req) | 
|  | return | 
|  | } | 
|  |  | 
|  | q := req.URL.Query() | 
|  |  | 
|  | limit, err := getIntParam(q, "limit", defaultLimit) | 
|  | if err != nil { | 
|  | sendAPIError(invalidParameter, "limit", resp, req) | 
|  | return | 
|  | } | 
|  | if limit > maxLimitChanges { | 
|  | sendAPIError(invalidParameter, "limit too high, exceeds max "+strconv.Itoa(maxLimitChanges), resp, req) | 
|  | return | 
|  | } | 
|  |  | 
|  | block, err := getIntParam(q, "block", 0) | 
|  | if err != nil { | 
|  | sendAPIError(invalidParameter, "block", resp, req) | 
|  | return | 
|  | } | 
|  |  | 
|  | scopes, err := getCheckChangeSelectorParams(req) | 
|  | if err != nil { | 
|  | sendAPIError(invalidParameter, err.Error(), resp, req) | 
|  | return | 
|  | } | 
|  | if len(scopes) == 0 { | 
|  | // If no scope specified, replace with the empty scope | 
|  | scopes = []string{""} | 
|  | } | 
|  |  | 
|  | var sinceSeq common.Sequence | 
|  | since := q.Get("since") | 
|  | if since == "" { | 
|  | sinceSeq = emptySequence | 
|  | } else { | 
|  | sinceSeq, err = common.ParseSequence(since) | 
|  | if err != nil { | 
|  | sendAPIError(invalidParameter, "since", resp, req) | 
|  | return | 
|  | } | 
|  | } | 
|  |  | 
|  | var snapshotFilter func([]byte) bool | 
|  | snapStr := q.Get("snapshot") | 
|  | if snapStr != "" { | 
|  | var snapshot *replication.Snapshot | 
|  | snapshot, err = replication.MakeSnapshot(snapStr) | 
|  | if err != nil { | 
|  | sendAPIError(invalidParameter, "snapshot", resp, req) | 
|  | return | 
|  | } | 
|  | snapshotFilter = makeSnapshotFilter(snapshot) | 
|  | } | 
|  |  | 
|  | // Need to advance past a single "since" value | 
|  | sinceSeq.Index++ | 
|  |  | 
|  | firstSeq, lastSeq, entries, success := | 
|  | s.receiveChanges(scopes, sinceSeq, limit, snapshotFilter, resp, req) | 
|  | if !success { | 
|  | return | 
|  | } | 
|  |  | 
|  | if len(entries) == 0 && block > 0 { | 
|  | // Query -- which was consistent at the "snapshot" level -- didn't | 
|  | // return anything. Wait until something is put in the database and try again. | 
|  | waitSeq := lastSeq | 
|  | waitSeq.Index++ | 
|  |  | 
|  | log.Debugf("Blocking at %s for up to %d seconds", waitSeq, block) | 
|  | newIndex := s.tracker.timedWait(waitSeq, time.Duration(block)*time.Second, scopes) | 
|  |  | 
|  | if newIndex.Compare(sinceSeq) > 0 { | 
|  | firstSeq, lastSeq, entries, success = | 
|  | s.receiveChanges(scopes, sinceSeq, limit, snapshotFilter, resp, req) | 
|  | if !success { | 
|  | return | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | changeList := common.ChangeList{ | 
|  | FirstSequence: firstSeq.String(), | 
|  | LastSequence:  lastSeq.String(), | 
|  | } | 
|  |  | 
|  | for _, e := range entries { | 
|  | change, err := decodeChangeProto(e) | 
|  | if err != nil { | 
|  | sendAPIError(serverError, | 
|  | fmt.Sprintf("Invalid data in database: %s", err), resp, req) | 
|  | } | 
|  | // Database doesn't have value of "Sequence" in it | 
|  | change.Sequence = change.GetSequence().String() | 
|  | changeList.Changes = append(changeList.Changes, *change) | 
|  | } | 
|  |  | 
|  | // Important to return an intermediate sequence if we ran up against the limit | 
|  | if len(entries) == limit && limit > 0 { | 
|  | changeList.LastSequence = changeList.Changes[len(entries)-1].Sequence | 
|  | } | 
|  |  | 
|  | switch enc { | 
|  | case jsonContent: | 
|  | resp.Header().Set("Content-Type", jsonContent) | 
|  | resp.Write(changeList.Marshal()) | 
|  | case protoContent: | 
|  | resp.Header().Set("Content-Type", protoContent) | 
|  | resp.Write(changeList.MarshalProto()) | 
|  | default: | 
|  | panic("Got to an unsupported media type") | 
|  | } | 
|  | } | 
|  |  | 
|  | func (s *server) receiveChanges( | 
|  | scopes []string, sinceSeq common.Sequence, | 
|  | limit int, filter func([]byte) bool, | 
|  | resp http.ResponseWriter, req *http.Request) (firstSeq, lastSeq common.Sequence, entries [][]byte, success bool) { | 
|  |  | 
|  | log.Debugf("Receiving changes: scopes = %v since = %s limit = %d", | 
|  | scopes, sinceSeq, limit) | 
|  |  | 
|  | var err error | 
|  | entries, firstSeq, lastSeq, err = s.db.Scan( | 
|  | scopes, sinceSeq.LSN, sinceSeq.Index, limit, filter) | 
|  |  | 
|  | if err != nil { | 
|  | sendAPIError(serverError, err.Error(), resp, req) | 
|  | return | 
|  | } | 
|  | if sinceSeq.Compare(firstSeq) < 0 && sinceSeq.Compare(lowestPossibleSequence) > 0 { | 
|  | // "since" parameter specified and too old. Need to return an error. | 
|  | log.Debugf("since value of %s is too old compared to %s\n", | 
|  | sinceSeq, firstSeq) | 
|  | sendAPIError(snapshotOld, "", resp, req) | 
|  | return | 
|  | } | 
|  |  | 
|  | log.Debugf("Received %d changes", len(entries)) | 
|  | success = true | 
|  | return | 
|  | } | 
|  |  | 
|  | /* | 
|  | getChangeSelectorParams combines all 'scope' query | 
|  | params into one slice after checking for valid characters. | 
|  | */ | 
|  | func getCheckChangeSelectorParams(r *http.Request) ([]string, error) { | 
|  | scopes := r.URL.Query()["scope"] | 
|  | for _, s := range scopes { | 
|  | if !reChangeSelector.MatchString(s) { | 
|  | return nil, errors.New("Invalid char in scope param") | 
|  | } | 
|  | } | 
|  | selectors := r.URL.Query()["selector"] | 
|  | for _, s := range selectors { | 
|  | if !reChangeSelector.MatchString(s) { | 
|  | return nil, errors.New("Invalid char in selector param") | 
|  | } | 
|  | } | 
|  | return append(scopes, selectors...), nil | 
|  | } | 
|  |  | 
|  | func makeSnapshotFilter(ss *replication.Snapshot) func([]byte) bool { | 
|  | return func(buf []byte) bool { | 
|  | txid, err := decodeChangeTXID(buf) | 
|  | if err == nil { | 
|  | return !ss.Contains(txid) | 
|  | } | 
|  | return false | 
|  | } | 
|  | } |