| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package splitable
- import (
- "database/sql"
- "fmt"
- "reflect"
- "time"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/reflectutils"
- "yunion.io/x/pkg/util/timeutils"
- "yunion.io/x/sqlchemy"
- )
- func (t *SSplitTableSpec) getLastTableSpecWithLock(lastDate time.Time) (*sqlchemy.STableSpec, error) {
- t.lastTableLock.Lock()
- defer t.lastTableLock.Unlock()
- now := time.Now()
- if t.lastTableSpec != nil && !t.lastTableExpire.IsZero() && t.lastTableExpire.Before(now) {
- return t.lastTableSpec, nil
- }
- lastTableSpec, err := t.getLastTableSpec(lastDate)
- if err != nil {
- return nil, errors.Wrap(err, "getLastTableSpec")
- }
- t.lastTableSpec = lastTableSpec
- t.lastTableExpire = now.Add(time.Hour * lastTableSpecExpireHours)
- return t.lastTableSpec, nil
- }
- func (t *SSplitTableSpec) getLastTableSpec(lastDate time.Time) (*sqlchemy.STableSpec, error) {
- lastMeta, err := t.getTableLastMeta()
- if err != nil && errors.Cause(err) != sql.ErrNoRows {
- return nil, errors.Wrap(err, "GetTableMeta")
- }
- var lastRecIndex int64
- var lastRecDate time.Time
- var lastTableSpec *sqlchemy.STableSpec
- newMeta := false
- if lastMeta != nil {
- if !lastMeta.StartDate.IsZero() && lastDate.Sub(lastMeta.StartDate) > t.maxDuration {
- lastTable := t.GetTableSpec(*lastMeta)
- ti := lastTable.Instance()
- q := ti.Query(sqlchemy.MAX("last_index", ti.Field(t.indexField)), sqlchemy.MAX("last_date", ti.Field(t.dateField)), sqlchemy.COUNT("total"))
- r := q.Row()
- var lastRecDateStr string
- var total uint64
- err := r.Scan(&lastRecIndex, &lastRecDateStr, &total)
- if err != nil {
- return nil, errors.Wrap(err, "scan lastRecIndex and lastRecDate")
- }
- log.Debugf("lastRecDateStr: %s", lastRecDateStr)
- lastRecDate, _ = timeutils.ParseTimeStr(lastRecDateStr)
- // seal last meta
- _, err = t.metaSpec.Update(lastMeta, func() error {
- lastMeta.End = lastRecIndex
- lastMeta.EndDate = lastRecDate
- lastMeta.Count = total
- return nil
- })
- if err != nil {
- return nil, errors.Wrap(err, "Update last meta")
- }
- newMeta = true
- } else {
- if lastMeta.StartDate.IsZero() {
- indexCol := t.tableSpec.ColumnSpec(t.indexField)
- startDate := lastMeta.CreatedAt
- if startDate.IsZero() {
- startDate = lastDate
- }
- _, err = t.metaSpec.Update(lastMeta, func() error {
- lastMeta.Start = indexCol.AutoIncrementOffset()
- lastMeta.StartDate = startDate
- return nil
- })
- if err != nil {
- return nil, errors.Wrap(err, "Update last meta")
- }
- }
- lastTableSpec = t.GetTableSpec(*lastMeta)
- }
- } else {
- newMeta = true
- }
- if newMeta {
- lastTableSpec, err = t.newTable(lastRecIndex, lastDate)
- if err != nil {
- return nil, errors.Wrap(err, "newTable")
- }
- }
- return lastTableSpec, nil
- }
- func (t *SSplitTableSpec) Insert(dt interface{}) error {
- var lastDate time.Time
- dataValue := reflect.Indirect(reflect.ValueOf(dt))
- vs := reflectutils.FetchAllStructFieldValueSet(dataValue)
- if lastDateV, ok := vs.GetValue(t.dateField); !ok {
- return errors.Wrap(errors.ErrInvalidStatus, "no dateField found")
- } else {
- lastDate = lastDateV.Interface().(time.Time)
- }
- if lastDate.IsZero() {
- lastDate = time.Now()
- succ := reflectutils.SetStructFieldValue(dataValue, t.dateField, reflect.ValueOf(lastDate))
- if !succ {
- return errors.Wrap(errors.ErrInvalidStatus, "set dateField failed")
- }
- }
- lastTableSpec, err := t.getLastTableSpecWithLock(lastDate)
- if err != nil {
- return errors.Wrap(err, "getLastTableSpec")
- }
- return lastTableSpec.Insert(dt)
- }
- func (t *SSplitTableSpec) newTable(lastRecIndex int64, lastDate time.Time) (*sqlchemy.STableSpec, error) {
- // insert a new metadata
- meta := STableMetadata{
- Table: fmt.Sprintf("%s_%d", t.tableName, lastDate.Unix()),
- }
- if lastRecIndex > 0 {
- // auto_increment offset should consider HA setup
- meta.Start = lastRecIndex + 10000
- meta.StartDate = lastDate
- }
- err := t.metaSpec.Insert(&meta)
- if err != nil {
- return nil, errors.Wrap(err, "insert new meta")
- }
- // create new table
- newTable := t.GetTableSpec(meta)
- err = newTable.Sync()
- if err != nil {
- return nil, errors.Wrap(err, "sync new table")
- }
- return newTable, nil
- }
|