|  | /* | 
|  | Copyright 2016 The Transicator Authors | 
|  |  | 
|  | Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | you may not use this file except in compliance with the License. | 
|  | You may obtain a copy of the License at | 
|  |  | 
|  | http://www.apache.org/licenses/LICENSE-2.0 | 
|  |  | 
|  | Unless required by applicable law or agreed to in writing, software | 
|  | distributed under the License is distributed on an "AS IS" BASIS, | 
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | See the License for the specific language governing permissions and | 
|  | limitations under the License. | 
|  | */ | 
|  | package common | 
|  |  | 
|  | import ( | 
|  | "encoding/binary" | 
|  | "errors" | 
|  | "io" | 
|  |  | 
|  | "github.com/golang/protobuf/proto" | 
|  | ) | 
|  |  | 
|  | /* | 
|  | A SnapshotWriter allows a snapshot to be constructed by writing to it one | 
|  | row and one table at a time. It contains some sanity checks to make sure | 
|  | that the writer uses it appropriately but in general it's important to | 
|  | be careful. | 
|  | */ | 
|  | type SnapshotWriter struct { | 
|  | writer       io.Writer | 
|  | tableWriting bool | 
|  | numColumns   int | 
|  | } | 
|  |  | 
|  | /* | 
|  | CreateSnapshotWriter creates a new SnapshotWriter, with the specified | 
|  | Postgres timestap (from "now()") and snapshot specification | 
|  | (from "txid_current_snapshot()"). | 
|  | */ | 
|  | func CreateSnapshotWriter(timestamp, snapshotInfo string, | 
|  | writer io.Writer) (*SnapshotWriter, error) { | 
|  | hdr := &SnapshotHeaderPb{ | 
|  | Timestamp: proto.String(timestamp), | 
|  | Snapshot:  proto.String(snapshotInfo), | 
|  | } | 
|  |  | 
|  | w := &SnapshotWriter{ | 
|  | writer: writer, | 
|  | } | 
|  |  | 
|  | err := w.writeProto(hdr) | 
|  | if err != nil { | 
|  | return nil, err | 
|  | } | 
|  | return w, nil | 
|  | } | 
|  |  | 
|  | /* | 
|  | StartTable tells the reader that it's time to start work on a new table. | 
|  | It is an error to start a table when a previous table has not been ended. | 
|  | */ | 
|  | func (w *SnapshotWriter) StartTable(tableName string, cols []ColumnInfo) error { | 
|  | if w.tableWriting { | 
|  | return errors.New("Cannot start a new table because last one isn't finished") | 
|  | } | 
|  | w.tableWriting = true | 
|  | w.numColumns = len(cols) | 
|  |  | 
|  | var colPb []*ColumnPb | 
|  | for _, col := range cols { | 
|  | newCol := &ColumnPb{ | 
|  | Name: proto.String(col.Name), | 
|  | Type: proto.Int32(col.Type), | 
|  | } | 
|  | colPb = append(colPb, newCol) | 
|  | } | 
|  |  | 
|  | table := &TableHeaderPb{ | 
|  | Name:    proto.String(tableName), | 
|  | Columns: colPb, | 
|  | } | 
|  | tableMsg := &StreamMessagePb_Table{ | 
|  | Table: table, | 
|  | } | 
|  | msg := &StreamMessagePb{ | 
|  | Message: tableMsg, | 
|  | } | 
|  | return w.writeProto(msg) | 
|  | } | 
|  |  | 
|  | /* | 
|  | EndTable ends data for the current table. It is an error to end the table when | 
|  | StartTable was not called. | 
|  | */ | 
|  | func (w *SnapshotWriter) EndTable() error { | 
|  | if !w.tableWriting { | 
|  | return errors.New("Cannot end a table because none has started") | 
|  | } | 
|  | w.tableWriting = false | 
|  | return nil | 
|  | } | 
|  |  | 
|  | /* | 
|  | WriteRow writes the values of a single column. It is an error to call this | 
|  | if StartTable was not called. It is also an error if the length of | 
|  | "columnValues" does not match the list of names passed to "StartTable." | 
|  | Values must be primitive types: | 
|  | * integer types | 
|  | * float types | 
|  | * bool | 
|  | * string | 
|  | * []byte | 
|  | */ | 
|  | func (w *SnapshotWriter) WriteRow(columnValues []interface{}) error { | 
|  | if !w.tableWriting { | 
|  | return errors.New("Cannot write a row because no table was started") | 
|  | } | 
|  | if len(columnValues) != w.numColumns { | 
|  | return errors.New("Write must include consistent number of columns") | 
|  | } | 
|  |  | 
|  | var columns []*ValuePb | 
|  | for _, v := range columnValues { | 
|  | col := convertParameter(v) | 
|  | colVal := &ValuePb{ | 
|  | Value: col, | 
|  | } | 
|  | columns = append(columns, colVal) | 
|  | } | 
|  |  | 
|  | row := &RowPb{ | 
|  | Values: columns, | 
|  | } | 
|  | rowMsg := &StreamMessagePb_Row{ | 
|  | Row: row, | 
|  | } | 
|  | msg := &StreamMessagePb{ | 
|  | Message: rowMsg, | 
|  | } | 
|  | return w.writeProto(msg) | 
|  | } | 
|  |  | 
|  | func (w *SnapshotWriter) writeProto(msg proto.Message) error { | 
|  | buf, err := proto.Marshal(msg) | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  | bufLen := int32(len(buf)) | 
|  | err = binary.Write(w.writer, networkByteOrder, bufLen) | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  | _, err = w.writer.Write(buf) | 
|  | return err | 
|  | } | 
|  |  | 
|  | /* | 
|  | A SnapshotReader reads a snapshot produced by a SnapshotWriter. | 
|  | The snapshot is read one table and one row at a time by calling "Next()". | 
|  | */ | 
|  | type SnapshotReader struct { | 
|  | reader    io.Reader | 
|  | timestamp string | 
|  | snapshot  string | 
|  | curBuf    []byte | 
|  | savedErr  error | 
|  | curTable  *TableInfo | 
|  | } | 
|  |  | 
|  | /* | 
|  | CreateSnapshotReader creates a reader. | 
|  | */ | 
|  | func CreateSnapshotReader(r io.Reader) (*SnapshotReader, error) { | 
|  | rdr := &SnapshotReader{ | 
|  | reader: r, | 
|  | } | 
|  |  | 
|  | buf, err := rdr.readBuf() | 
|  | if err != nil { | 
|  | return nil, err | 
|  | } | 
|  |  | 
|  | var hdrPb SnapshotHeaderPb | 
|  | err = proto.Unmarshal(buf, &hdrPb) | 
|  | if err != nil { | 
|  | return nil, err | 
|  | } | 
|  | rdr.snapshot = hdrPb.GetSnapshot() | 
|  | rdr.timestamp = hdrPb.GetTimestamp() | 
|  | return rdr, nil | 
|  | } | 
|  |  | 
|  | /* | 
|  | Timestamp returns the time (in postgres "now()") format when the snapshot | 
|  | was created. | 
|  | */ | 
|  | func (r *SnapshotReader) Timestamp() string { | 
|  | return r.timestamp | 
|  | } | 
|  |  | 
|  | /* | 
|  | SnapshotInfo returns the information from "txid_current_snapshot()". | 
|  | */ | 
|  | func (r *SnapshotReader) SnapshotInfo() string { | 
|  | return r.snapshot | 
|  | } | 
|  |  | 
|  | /* | 
|  | Next positions the snapshot reader on the next record. To read the snapshot, | 
|  | a reader must first call "Next," then call "Entry" to get the entry | 
|  | for processing. The reader should continue this process until Next | 
|  | returns "false." | 
|  | */ | 
|  | func (r *SnapshotReader) Next() bool { | 
|  | buf, err := r.readBuf() | 
|  | if err == io.EOF { | 
|  | return false | 
|  | } else if err != nil { | 
|  | r.savedErr = err | 
|  | return true | 
|  | } | 
|  | r.curBuf = buf | 
|  | return true | 
|  | } | 
|  |  | 
|  | /* | 
|  | Entry returns the current entry. It can be one of three things: | 
|  | 1) A TableInfo, which tells us to start writing to a new table, or | 
|  | 2) a Row, which denotes what you think it does. | 
|  | 3) an error, which indicates that we got incomplete data. | 
|  | It is an error to call this function if "Next" was never called. | 
|  | It is also an error to call this function once "Next" returned false. | 
|  | Finally, it is an error to continue processing after this function | 
|  | has returned an error. | 
|  | */ | 
|  | func (r *SnapshotReader) Entry() interface{} { | 
|  | if r.savedErr != nil { | 
|  | return r.savedErr | 
|  | } | 
|  | if r.curBuf == nil { | 
|  | return errors.New("Incorrect call sequence") | 
|  | } | 
|  |  | 
|  | var msg StreamMessagePb | 
|  | err := proto.Unmarshal(r.curBuf, &msg) | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  |  | 
|  | row := msg.GetRow() | 
|  | table := msg.GetTable() | 
|  |  | 
|  | if table == nil { | 
|  | if r.curTable == nil { | 
|  | return errors.New("Invalid stream: Got rows before table") | 
|  | } | 
|  | if len(r.curTable.Columns) != len(row.Values) { | 
|  | return errors.New("Invalid stream: Received incorrect number of columns") | 
|  | } | 
|  |  | 
|  | ri := make(map[string]*ColumnVal) | 
|  | for i, col := range r.curTable.Columns { | 
|  | cv := &ColumnVal{ | 
|  | Type:  col.Type, | 
|  | Value: unwrapColumnVal(row.Values[i]), | 
|  | } | 
|  | ri[col.Name] = cv | 
|  | } | 
|  | return Row(ri) | 
|  | } | 
|  |  | 
|  | // Return information on the new table, plus keep it for column processing | 
|  | var cols []ColumnInfo | 
|  | for _, ti := range table.GetColumns() { | 
|  | col := ColumnInfo{ | 
|  | Name: ti.GetName(), | 
|  | Type: ti.GetType(), | 
|  | } | 
|  | cols = append(cols, col) | 
|  | } | 
|  | ti := TableInfo{ | 
|  | Name:    table.GetName(), | 
|  | Columns: cols, | 
|  | } | 
|  | r.curTable = &ti | 
|  | return ti | 
|  | } | 
|  |  | 
|  | func (r *SnapshotReader) readBuf() ([]byte, error) { | 
|  | var bufLen int32 | 
|  | err := binary.Read(r.reader, networkByteOrder, &bufLen) | 
|  | if err != nil { | 
|  | return nil, err | 
|  | } | 
|  |  | 
|  | buf := make([]byte, bufLen) | 
|  | _, err = io.ReadFull(r.reader, buf) | 
|  | if err == nil { | 
|  | return buf, nil | 
|  | } | 
|  | return nil, err | 
|  | } |