updates on dependencies + multithreading fixes #2

Merged
paulbsd merged 3 commits from develop into master 2023-09-14 15:42:50 +02:00
3 changed files with 40 additions and 35 deletions
Showing only changes of commit 9bea58a890 - Show all commits

View File

@ -40,7 +40,7 @@ func main() {
go models.ScanIP(&cfg) go models.ScanIP(&cfg)
} }
if cfg.Switchs.Debug { if cfg.Options.Debug {
go func() { go func() {
log.Println(http.ListenAndServe("localhost:6060", nil)) log.Println(http.ListenAndServe("localhost:6060", nil))
}() }()

View File

@ -10,7 +10,6 @@ import (
func (cfg *Config) GetConfig() error { func (cfg *Config) GetConfig() error {
var configfile string var configfile string
var debug bool
var drop bool var drop bool
var init bool var init bool
var port int var port int
@ -21,7 +20,6 @@ func (cfg *Config) GetConfig() error {
flag.StringVar(&configfile, "configfile", "ipbl.ini", "Configuration file to use with ipbl section") flag.StringVar(&configfile, "configfile", "ipbl.ini", "Configuration file to use with ipbl section")
flag.IntVar(&port, "port", 8099, "Web service port to use") flag.IntVar(&port, "port", 8099, "Web service port to use")
flag.BoolVar(&debug, "debug", false, "If debug logging must be enabled")
flag.BoolVar(&drop, "drop", false, "If dropping tables must occur") flag.BoolVar(&drop, "drop", false, "If dropping tables must occur")
flag.BoolVar(&init, "init", false, "If init of database must be done") flag.BoolVar(&init, "init", false, "If init of database must be done")
flag.BoolVar(&version, "version", false, "Show version") flag.BoolVar(&version, "version", false, "Show version")
@ -29,7 +27,6 @@ func (cfg *Config) GetConfig() error {
flag.Parse() flag.Parse()
cfg.Switchs.Debug = debug
cfg.Switchs.Drop = drop cfg.Switchs.Drop = drop
cfg.Switchs.Init = init cfg.Switchs.Init = init
cfg.Switchs.Port = port cfg.Switchs.Port = port
@ -50,6 +47,7 @@ func (cfg *Config) GetConfig() error {
cfg.Options.SocketChannel = "ipbl" cfg.Options.SocketChannel = "ipbl"
cfg.Options.HideBanner = ipblsection.Key("hidebanner").MustBool(false) cfg.Options.HideBanner = ipblsection.Key("hidebanner").MustBool(false)
cfg.Options.Debug = ipblsection.Key("debug").MustBool(false)
return nil return nil
} }
@ -66,11 +64,11 @@ type Config struct {
Version string Version string
HideBanner bool HideBanner bool
SocketChannel string SocketChannel string
Debug bool
} }
Switchs struct { Switchs struct {
Port int Port int
NoFeed bool NoFeed bool
Debug bool
Drop bool Drop bool
Init bool Init bool
Version bool Version bool

View File

@ -8,6 +8,7 @@ import (
"io" "io"
"log" "log"
"net/http" "net/http"
"runtime"
"sync" "sync"
"time" "time"
@ -131,21 +132,30 @@ func InsertIPBulk(session *xorm.Session, ips *[]IP) (numinsert int64, numupdate
} }
func ScanIP(cfg *config.Config) (err error) { func ScanIP(cfg *config.Config) (err error) {
var numthreads = 8 var numthreads int = runtime.NumCPU() / 2
for { for {
session := cfg.Db.NewSession() session := cfg.Db.NewSession()
orphans := []IP{} orphans := []IP{}
err = session.Where(`((as_id IS NULL err = session.Where(`
OR country_id IS NULL (
OR city_id IS NULL (
OR rdns = '' as_id IS NULL
OR rdns IS NULL) AND updated < now()-'1d'::interval) OR OR country_id IS NULL
(as_id IS NULL OR city_id IS NULL
AND country_id IS NULL OR rdns = ''
AND city_id IS NULL OR rdns IS NULL
AND rdns IS NULL) )
`).Desc("updated").Limit(SCANLIMIT).Find(&orphans) AND updated < now()-'1d'::interval
session.Close() )
OR
(
as_id IS NULL
AND country_id IS NULL
AND city_id IS NULL
AND rdns IS NULL
)
`).Desc("updated").Limit(SCANLIMIT).Find(&orphans)
defer session.Close()
if err == nil && len(orphans) > 0 { if err == nil && len(orphans) > 0 {
orphanchan := make(chan IP) orphanchan := make(chan IP)
@ -161,10 +171,9 @@ func ScanIP(cfg *config.Config) (err error) {
} }
close(orphanchan) close(orphanchan)
<-done
wg.Wait() wg.Wait()
} else { } else {
time.Sleep(30 * time.Second) time.Sleep(1 * time.Second)
} }
} }
} }
@ -172,19 +181,21 @@ func ScanIP(cfg *config.Config) (err error) {
func ScanOrphan(wg *sync.WaitGroup, orphans chan IP, done chan bool, thr int, cfg *config.Config) (err error) { func ScanOrphan(wg *sync.WaitGroup, orphans chan IP, done chan bool, thr int, cfg *config.Config) (err error) {
wg.Add(1) wg.Add(1)
session := cfg.Db.NewSession()
defer session.Close()
queryclient := http.Client{}
for { for {
orphan, more := <-orphans orphan, more := <-orphans
if more { if more {
session := cfg.Db.NewSession()
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
queryclient := http.Client{}
var query QueryIP var query QueryIP
query, err := QueryInfo(&queryclient, orphan.IP) query, err := QueryInfo(&queryclient, orphan.IP)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
time.Sleep(10 * time.Minute) time.Sleep(1 * time.Minute)
continue continue
} }
@ -202,8 +213,8 @@ func ScanOrphan(wg *sync.WaitGroup, orphans chan IP, done chan bool, thr int, cf
} }
orphan.Rdns = sql.NullString{String: query.Rdns, Valid: true} orphan.Rdns = sql.NullString{String: query.Rdns, Valid: true}
if cfg.Switchs.Debug { if cfg.Options.Debug {
log.Printf("%s -> \"%s\"\n", orphan.IP, query.Rdns) log.Printf("t%d: %s -> \"%s\"\n", thr, orphan.IP, query.Rdns)
} }
_, err = orphan.GetOrCreate(session) _, err = orphan.GetOrCreate(session)
@ -211,21 +222,17 @@ func ScanOrphan(wg *sync.WaitGroup, orphans chan IP, done chan bool, thr int, cf
continue continue
} }
err = session.Commit()
if err != nil {
log.Println(err)
}
err = session.Close()
if err != nil {
log.Println(err)
}
} else { } else {
//fmt.Printf("All orphan migrated on thread num %d\n", thr) fmt.Printf("All orphan migrated on thread num %d\n", thr)
wg.Done() wg.Done()
done <- true break
return nil
} }
} }
err = session.Commit()
if err != nil {
log.Println(err)
}
return nil
} }
func QueryInfo(client *http.Client, ip string) (query QueryIP, err error) { func QueryInfo(client *http.Client, ip string) (query QueryIP, err error) {