From 94823e104d813f01d1e15c7a5557e7404b9dacbc Mon Sep 17 00:00:00 2001 From: Aaron Lindsay Date: Fri, 22 Feb 2013 00:06:10 -0500 Subject: [PATCH] Add server database, returning events --- api.go | 14 +++++- client/asink.go | 44 ++++++++--------- client/database.go | 4 +- events.go | 21 ++++---- server/database.go | 119 +++++++++++++++++++++++++++++++++++++++++++++ server/server.go | 62 ++++++++++++++++++----- 6 files changed, 215 insertions(+), 49 deletions(-) create mode 100644 server/database.go diff --git a/api.go b/api.go index 3a79b37..0b9fd0f 100644 --- a/api.go +++ b/api.go @@ -1,6 +1,18 @@ package asink +type APIStatus uint32 + +const ( + SUCCESS = 0 + iota + ERROR +) + type APIResponse struct { - Status string //may be 'error' or 'success' + Status APIStatus Explanation string + Events []*Event +} + +type EventList struct { + Events []*Event } diff --git a/client/asink.go b/client/asink.go index b8b0ddb..b27b21a 100644 --- a/client/asink.go +++ b/client/asink.go @@ -7,10 +7,11 @@ import ( "code.google.com/p/goconf/conf" "database/sql" "encoding/json" + "errors" "flag" "fmt" - "net/http" "io/ioutil" + "net/http" "os" "os/user" "path" @@ -45,7 +46,6 @@ func init() { func main() { flag.Parse() - fmt.Println("config file:", globals.configFileName) config, err := conf.ReadConfigFile(globals.configFileName) if err != nil { @@ -78,13 +78,6 @@ func main() { panic(err) } - //TODO FIXME REMOVEME - fmt.Println(globals.syncDir) - fmt.Println(globals.cacheDir) - fmt.Println(globals.tmpDir) - fmt.Println(globals.storage) - //TODO FIXME REMOVEME - globals.server, err = config.GetString("server", "host") globals.port, err = config.GetInt("server", "port") @@ -94,7 +87,6 @@ func main() { globals.db, err = GetAndInitDB(config) if err != nil { panic(err) - return } for { @@ -158,6 +150,19 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) { } //finally, send it off to the server + err = SendEvent(globals, event) + if err != nil { + panic(err) //TODO handle sensibly + } + + event.Status |= asink.ON_SERVER + err = DatabaseUpdateEvent(globals.db, event) + if err != nil { + panic(err) + } +} + +func SendEvent(globals AsinkGlobals, event *asink.Event) error { url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/" //construct json payload @@ -166,36 +171,31 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) { } b, err := json.Marshal(events) if err != nil { - panic(err) + return err } fmt.Println(string(b)) //actually make the request resp, err := http.Post(url, "application/json", bytes.NewReader(b)) if err != nil { - panic(err) + return err } defer resp.Body.Close() //check to make sure request succeeded body, err := ioutil.ReadAll(resp.Body) if err != nil { - panic(err) + return err } var apistatus asink.APIResponse err = json.Unmarshal(body, &apistatus) if err != nil { - panic(err) //TODO handle sensibly + return err } - if apistatus.Status != "success" { - panic("Status not success") //TODO handle sensibly + if apistatus.Status != asink.SUCCESS { + return errors.New("API response was not success") } - fmt.Println(apistatus) - event.Status |= asink.ON_SERVER - err = DatabaseUpdateEvent(globals.db, event) - if err != nil { - panic(err) - } + return nil } diff --git a/client/database.go b/client/database.go index efebe3e..e4eefb1 100644 --- a/client/database.go +++ b/client/database.go @@ -49,7 +49,7 @@ func DatabaseAddEvent(db *sql.DB, e *asink.Event) error { if err != nil { return err } - result, err := tx.Exec("INSERT INTO events (id, type, status, path, hash, timestamp, permissions) VALUES (?,?,?,?,?,?,?);", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, 0) + result, err := tx.Exec("INSERT INTO events (id, type, status, path, hash, timestamp, permissions) VALUES (?,?,?,?,?,?,?);", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, e.Permissions) if err != nil { return err } @@ -76,7 +76,7 @@ func DatabaseUpdateEvent(db *sql.DB, e *asink.Event) error { if err != nil { return err } - result, err := tx.Exec("UPDATE events SET id=?, type=?, status=?, path=?, hash=?, timestamp=?, permissions=? WHERE localid=?;", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, 0, e.LocalId) + result, err := tx.Exec("UPDATE events SET id=?, type=?, status=?, path=?, hash=?, timestamp=?, permissions=? WHERE localid=?;", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, e.Permissions, e.LocalId) if err != nil { return err } diff --git a/events.go b/events.go index 02848ba..98c6e27 100644 --- a/events.go +++ b/events.go @@ -21,14 +21,15 @@ const ( ) type Event struct { - Id int64 - LocalId int64 - Type EventType - Status EventStatus - Path string - Hash string - Timestamp int64 - InDB bool `json:"-"` //defaults to false. Omitted from json marshalling. + Id int64 + LocalId int64 + Type EventType + Status EventStatus + Path string + Hash string + Timestamp int64 + Permissions uint32 + InDB bool `json:"-"` //defaults to false. Omitted from json marshalling. } func (e Event) IsUpdate() bool { @@ -38,7 +39,3 @@ func (e Event) IsUpdate() bool { func (e Event) IsDelete() bool { return e.Type&DELETE == DELETE } - -type EventList struct { - Events []*Event -} diff --git a/server/database.go b/server/database.go new file mode 100644 index 0000000..838d251 --- /dev/null +++ b/server/database.go @@ -0,0 +1,119 @@ +package main + +import ( + "asink" + "database/sql" + "errors" + _ "github.com/mattn/go-sqlite3" + "strconv" +) + +func GetAndInitDB() (*sql.DB, error) { + dbLocation := "asink-server.db" //TODO make me configurable + + db, err := sql.Open("sqlite3", dbLocation) + if err != nil { + return nil, err + } + + //make sure the events table is created + tx, err := db.Begin() + if err != nil { + return nil, err + } + rows, err := tx.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='events';") + if err != nil { + return nil, err + } + if !rows.Next() { + //if this is false, it means no rows were returned + tx.Exec("CREATE TABLE events (id INTEGER PRIMARY KEY ASC, localid INTEGER, type INTEGER, status INTEGER, path TEXT, hash TEXT, timestamp INTEGER, permissions INTEGER);") + tx.Exec("CREATE INDEX IF NOT EXISTS pathidx on events (path);") + } + err = tx.Commit() + if err != nil { + return nil, err + } + + return db, nil +} + +func DatabaseAddEvent(db *sql.DB, e *asink.Event) error { + tx, err := db.Begin() + if err != nil { + return err + } + result, err := tx.Exec("INSERT INTO events (localid, type, status, path, hash, timestamp, permissions) VALUES (?,?,?,?,?,?,?);", e.LocalId, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, e.Permissions) + if err != nil { + return err + } + id, err := result.LastInsertId() + if err != nil { + return err + } + err = tx.Commit() + if err != nil { + return err + } + + e.Id = id + e.InDB = true + return nil +} + +func DatabaseUpdateEvent(db *sql.DB, e *asink.Event) error { + if !e.InDB { + return errors.New("Attempting to update an event in the database which hasn't been previously added.") + } + + tx, err := db.Begin() + if err != nil { + return err + } + result, err := tx.Exec("UPDATE events SET id=?, type=?, status=?, path=?, hash=?, timestamp=?, permissions=? WHERE id=?;", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, e.Permissions, e.Id) + if err != nil { + return err + } + rows, err := result.RowsAffected() + if err != nil { + return err + } + if rows != 1 { + return errors.New("Updated " + strconv.Itoa(int(rows)) + " row(s) when intending to update 1 event row.") + } + err = tx.Commit() + if err != nil { + return err + } + + return nil +} + +func DatabaseRetrieveEvents(db *sql.DB, firstId uint64, maxEvents uint) ([]*asink.Event, error) { + var events []*asink.Event + + tx, err := db.Begin() + if err != nil { + return nil, err + } + + rows, err := tx.Query("SELECT id, localid, type, status, path, hash, timestamp, permissions FROM events WHERE id >= ? ORDER BY id ASC LIMIT ?;", firstId, maxEvents) + if err != nil { + return nil, err + } + for rows.Next() { + var event asink.Event + err = rows.Scan(&event.Id, &event.LocalId, &event.Type, &event.Status, &event.Path, &event.Hash, &event.Timestamp, &event.Permissions) + if err != nil { + return nil, err + } + events = append(events, &event) + } + + err = tx.Commit() + if err != nil { + return nil, err + } + + return events, nil +} diff --git a/server/server.go b/server/server.go index 7890125..050735d 100644 --- a/server/server.go +++ b/server/server.go @@ -2,6 +2,7 @@ package main import ( "asink" + "database/sql" "encoding/json" "flag" "fmt" @@ -12,15 +13,22 @@ import ( ) var eventsRegexp *regexp.Regexp - var port int = 8080 +var db *sql.DB + func init() { + var err error const port_usage = "Port on which to serve HTTP API" flag.IntVar(&port, "port", 8080, port_usage) flag.IntVar(&port, "p", 8080, port_usage+" (shorthand)") eventsRegexp = regexp.MustCompile("^/events/([0-9]+)$") + + db, err = GetAndInitDB() + if err != nil { + panic(err) + } } func main() { @@ -42,28 +50,58 @@ func rootHandler(w http.ResponseWriter, r *http.Request) { } func getEvents(w http.ResponseWriter, r *http.Request, nextEvent uint64) { - fmt.Fprintf(w, strconv.FormatUint(nextEvent, 10)) -} - -func putEvents(w http.ResponseWriter, r *http.Request) { - var events asink.EventList - var error_occurred bool = false + var events []*asink.Event var error_message string = "" defer func() { var apiresponse asink.APIResponse - if error_occurred { + if error_message != "" { apiresponse = asink.APIResponse{ - Status: "error", + Status: asink.ERROR, Explanation: error_message, } } else { apiresponse = asink.APIResponse{ - Status: "success", + Status: asink.SUCCESS, + Events: events, } } b, err := json.Marshal(apiresponse) if err != nil { - panic(err) + error_message = err.Error() + return + } + w.Write(b) + }() + + events, err := DatabaseRetrieveEvents(db, nextEvent, 50) + if err != nil { + panic(err) + error_message = err.Error() + return + } + + //TODO long-poll here if events is empty +} + +func putEvents(w http.ResponseWriter, r *http.Request) { + var events asink.EventList + var error_message string = "" + defer func() { + var apiresponse asink.APIResponse + if error_message != "" { + apiresponse = asink.APIResponse{ + Status: asink.ERROR, + Explanation: error_message, + } + } else { + apiresponse = asink.APIResponse{ + Status: asink.SUCCESS, + } + } + b, err := json.Marshal(apiresponse) + if err != nil { + error_message = err.Error() + return } w.Write(b) }() @@ -79,7 +117,7 @@ func putEvents(w http.ResponseWriter, r *http.Request) { return } for _, event := range events.Events { - fmt.Println(event) + DatabaseAddEvent(db, event) } }